WorkflowsでStorage Transfer Service用コネクタを用いてCloud Storageバケット間のファイル転送を行う

WorkflowsでStorage Transfer Service用コネクタを用いてCloud Storageバケット間のファイル転送を行う

Clock Icon2024.11.02

概要

数あるWorkflowsのコネクタの中でも今回取り上げるのはStorage Transfer Service(以下STS)のコネクタです。
サンプルをベースにSTSコネクタを用いてWorkflowsでSTSジョブを作成し、作成したSTSジョブを起動してCloud Storageバケット間のオブジェクト移動を試してみました。

以下がSTSコネクタのサンプルです。
https://cloud.google.com/workflows/docs/samples/workflows-connector-storagetransfer?hl=ja

もしWorkflowsからのSTS操作に興味があれば読んでみてください。

やってみる

STSジョブの作成

まずSTSジョブの作成をWorkflowsからSTSコネクタで行います。
ジョブの作成は以下のコネクタです。

Method: googleapis.storagetransfer.v1.transferJobs.create

https://cloud.google.com/workflows/docs/reference/googleapis/storagetransfer/v1/transferJobs/create

Cloud Storage間のオブジェクトコピーを行うジョブの作成は以下のyamlとなります。

- init:
    assign:
      - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
      - transfer_job_name: "transferJobs/testjob"
      - src_bucket_name: "コピー元バケット名"
      - sink_bucket_name: "コピー先バケット名"
- create_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.create
    args:
      body:
        name: ${transfer_job_name}
        description: "Test JOB"
        projectId: ${project_id}
        transferSpec:
          gcsDataSink:
            bucketName: ${sink_bucket_name}
            path: ""
          gcsDataSource:
            bucketName: ${src_bucket_name}
            path: ""
          objectConditions:
            includePrefixes: ["test/"]
        status: "DISABLED"

設定値の説明は以下の表となります。

設定値 概要
name ジョブの名称。設定しない場合は一意の名前が自動で設定される。設定する場合はtransferJobs/で始まる必要がある
description ジョブの説明
projectId ジョブを作成するプロジェクトID
transferSpec 転送内容の詳細
gcsDataSink コピー先バケット
gcsDataSource コピー元バケット
ObjectConditions 転送対象をフィルタするためのプレフィックスの条件
status ジョブ作成時のステータス

ObjectConditionsincludePrefixesを設定することで特定の接頭辞だけを選択するように指定できます。除外する場合はexcludePrefixesを用います。
上記の実装ではincludePrefixestest/を指定していますのでtest/test1.txtのようにtest/がプレフィックスとなっているオブジェクトがコピー対象となります。

上記Workflowsを実行するとSTSのジョブが作成されます。
スクリーンショット 2024-11-02 0.03.48

フィルタ接頭辞で含めるincludePrefixesで指定した値が設定されていることや、コピー元・コピー先バケットが指定した通りになっていることが確認できると思います。また、statusDISABLEDで作成したのでジョブが無効になっている状態です。有効のまま作成する場合はENABLEDを指定します。

有効に設定するには以下のコネクタ(patch)を用います。

Method: googleapis.storagetransfer.v1.transferJobs.patch

このコネクタを用いてジョブのステータスを有効(ENABLED)にします。patch操作ではジョブに対して様々な更新ができます。ジョブの削除もこのコネクタで行います。

- enable_transfer_job:
   call: googleapis.storagetransfer.v1.transferJobs.patch
   args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
        transferJob:
          status: "ENABLED"

ジョブを削除する

削除する場合もpatchを用います。

Method: googleapis.storagetransfer.v1.transferJobs.patch

https://cloud.google.com/workflows/docs/reference/googleapis/storagetransfer/v1/transferJobs/patch

statusDELETEDを設定することでジョブが削除されます。

- delete_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.patch
    args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
        transferJob:
          status: "DELETED"

ジョブを実行する

実行は以下のコネクタです。

Method: googleapis.storagetransfer.v1.transferJobs.run

https://cloud.google.com/workflows/docs/reference/googleapis/storagetransfer/v1/transferJobs/run

このコネクタは実行するだけなので引数もジョブ名・プロジェクトIDのみとシンプルなものです。

- run_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.run
    args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
    result: run_result

ジョブ作成から削除まで

ジョブの作成から実行までのYAMLは以下となります。

- init:
    assign:
      - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
      - transfer_job_name: "transferJobs/testjob"
      - src_bucket_name: "コピー元バケット名"
      - sink_bucket_name: "コピー先バケット名"
- create_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.create
    args:
      body:
        name: ${transfer_job_name}
        description: "Test JOB"
        projectId: ${project_id}
        transferSpec:
          gcsDataSink:
            bucketName: ${sink_bucket_name}
            path: ""
          gcsDataSource:
            bucketName: ${src_bucket_name}
            path: ""
          objectConditions:
            includePrefixes: ["test/"]
        status: "DISABLED"
- enable_transfer_job:
   call: googleapis.storagetransfer.v1.transferJobs.patch
   args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
        transferJob:
          status: "ENABLED"
- run_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.run
    args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
    result: run_result

ジョブを実行するとsrc_bucket_nameで指定したバケットにtest/のプレフィックスがついているオブジェクトがsink_bucket_nameにコピーされています。

Workflowsからスケジュール実行してみる

STSのスケジューラでの実行間隔の最短は1時間ですが、Workflows(Cloud Scheduler)の実行間隔は最短1分です。
つまりWorkflowsからSTSを実行するのようにしたらSTSの実行間隔をより高頻度にできるのではないかと思い試してみました。

STSジョブを実行するだけのワークフローを作成し、1分ごとに実行するスケジュール設定をします。
スクリーンショット 2024-11-02 1.18.21

- init:
    assign:
      - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
      - transfer_job_name: "transferJobs/testsamplejob"
- run_transfer_job:
    call: googleapis.storagetransfer.v1.transferJobs.run
    args:
      jobName: ${transfer_job_name}
      body:
        projectId: ${project_id}
    result: run_result
- the_end:
    return: "SUCCESS"

STSジョブの実行履歴をみてみます。

開始時間 ステータス
2024年11月2日 1:18:03 UTC+9 成功
2024年11月2日 1:17:03 UTC+9 成功
2024年11月2日 1:16:06 UTC+9 成功
2024年11月2日 1:15:02 UTC+9 成功

実行結果のスクリーンショットは以下です。
スクリーンショット 2024-11-02 1.19.29

上記より1分おきに実行できていることが確認できました。
STSジョブを1時間より短い間隔で定期実行したい場合、Workflowsからの実行もアリだなと思います。

まとめ

ジョブの作成や削除はWorkflowsから行うことは少ないかもですが、実行や短い間隔でのスケジュール実行などはWorkflowsから行うこともワークロードによってはありかなと思います。
ワークフローでエラーが起きたファイルをまとめて移動する処理に使うというのもありかも。
いろいろ使い道はありそうだなと思います。

それではまた。ナマステー

参考

Storage Transfer Service 用コネクタ
Storage Transfer Service API Connector Overview

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.